# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict, List

from google_cloud_pipeline_components import _image
from google_cloud_pipeline_components import _placeholders
from kfp.dsl import ConcatPlaceholder
from kfp.dsl import container_component
from kfp.dsl import ContainerSpec
from kfp.dsl import OutputPath


@container_component
def dataflow_flex_template(
    container_spec_gcs_path: str,
    gcp_resources: OutputPath(str),
    location: str = 'us-central1',
    job_name: str = '',
    parameters: Dict[str, str] = {},
    launch_options: Dict[str, str] = {},
    num_workers: int = 0,
    max_workers: int = 0,
    service_account_email: str = '',
    temp_location: str = '',
    machine_type: str = '',
    additional_experiments: List[str] = [],
    network: str = '',
    subnetwork: str = '',
    additional_user_labels: Dict[str, str] = {},
    kms_key_name: str = '',
    ip_configuration: str = '',
    worker_region: str = '',
    worker_zone: str = '',
    enable_streaming_engine: bool = False,
    flexrs_goal: str = '',
    staging_location: str = '',
    sdk_container_image: str = '',
    disk_size_gb: int = 0,
    autoscaling_algorithm: str = '',
    dump_heap_on_oom: bool = False,
    save_heap_dumps_to_gcs_path: str = '',
    launcher_machine_type: str = '',
    enable_launcher_vm_serial_port_logging: bool = False,
    update: bool = False,
    transform_name_mappings: Dict[str, str] = {},
    validate_only: bool = False,
    project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
):
  # fmt: off
  """Launch a job with a Dataflow Flex Template.

  Args:
    location: The regional endpoint to which to direct the request. E.g., us-central1, us-west1. Defaults to `us-central1` if not set.
    job_name: The job name to use for the created job. For update job requests, the job name should be the same as the existing running job. If none is specified, a default name will be generated by the component.
    container_spec_gcs_path: Cloud Storage path to a file with json serialized ContainerSpec as content.
    parameters: The parameters for the flex template. Ex. {"my_template_param":"5"}
    launch_options: Launch options for this flex template job. This is a common set of options across languages and templates. This should not be used to pass job parameters.
    num_workers: The initial number of Google Compute Engine instances for the job. If empty or unspecified, the Dataflow service determines an appropriate number of workers.
    max_workers: The maximum number of Google Compute Engine instances to be made available to your pipeline during execution, from 1 to 1000. If empty or unspecified, the Dataflow service determines a default maximum number of instances. For more details, see https://cloud.google.com/dataflow/docs/horizontal-autoscaling.
    service_account_email: The email address of the service account to run the job as. If unspecified, the Dataflow service uses the project's Compute Engine default service account.
    temp_location: The Cloud Storage path to use for temporary files. Must be a valid Cloud Storage URL, beginning with gs://. For more details, see https://cloud.google.com/dataflow/docs/guides/setting-pipeline-options#setting_required_options.
    machine_type: The machine type to use for the Dataflow job. Defaults to the value from the template if not specified.
    additional_experiments: Additional experiment flags for the job.
    network: Network to which VMs will be assigned. If empty or unspecified, the service will use the network "default".
    subnetwork: Subnetwork to which VMs will be assigned, if desired. You can specify a subnetwork using either a complete URL or an abbreviated path. Expected to be of the form "https://www.googleapis.com/compute/v1/projects/HOST_PROJECT_ID/regions/REGION/subnetworks/SUBNETWORK" or "regions/REGION/subnetworks/SUBNETWORK". If the subnetwork is located in a Shared VPC network, you must use the complete URL.
    additional_user_labels: Additional user labels to be specified for the job. Keys and values must follow the restrictions specified in the labeling restrictions page (https://cloud.google.com/compute/docs/labeling-resources#restrictions). An object containing a list of "key": value pairs. Example: `{ "name": "wrench", "mass": "1kg", "count": "3" }`.
    kms_key_name: Name for the Cloud KMS key for the job. Key format is "projects/HOST_PROJECT_ID/locations/LOCATION/keyRings/KEYRING_ID/cryptoKeys/CRYPTO_KEY_ID"
    ip_configuration: Configuration for VM IPs.
    worker_region: The Compute Engine region (https://cloud.google.com/compute/docs/regions-zones/regions-zones) in which worker processing should occur, e.g. "us-west1". Mutually exclusive with worker_zone. If neither worker_region nor worker_zone is specified, default to the control plane's region.
    worker_zone: The Compute Engine zone (https://cloud.google.com/compute/docs/regions-zones/regions-zones) in which worker processing should occur, e.g. "us-west1-a". Mutually exclusive with workerRegion. If neither worker_region nor worker_zone is specified, a zone in the control plane's region is chosen based on available capacity.
    enable_streaming_engine: Whether to enable Streaming Engine for the job.
    flexrs_goal: Set FlexRS goal for the job. For more details, see https://cloud.google.com/dataflow/docs/guides/flexrs.
    staging_location: The Cloud Storage path for staging local files. Must be a valid Cloud Storage URL, beginning with gs://. For more details, see https://cloud.google.com/dataflow/docs/guides/setting-pipeline-options#setting_required_options.
    sdk_container_image: Docker registry location (e.g. Artifact Registry) of the container image to use for the worker harness. Default is the container for the version of the SDK. Note this field is only valid for portable Dataflow pipeline jobs.
    disk_size_gb: Worker disk size, in gigabytes. If empty or unspecified, the Dataflow service determines an appropriate disk size.
    autoscaling_algorithm: The algorithm to use for autoscaling. If empty or unspecified, the Dataflow service sets a default value. For more details, see https://cloud.google.com/dataflow/docs/reference/pipeline-options#resource_utilization.
    dump_heap_on_oom: If true, when processing time is spent almost entirely on garbage collection (GC), saves a heap dump before ending the thread or process. If false, ends the thread or process without saving a heap dump. Does not save a heap dump when the Java Virtual Machine (JVM) has an out of memory error during processing. The location of the heap file is either echoed back to the user, or the user is given the opportunity to download the heap file.
    save_heap_dumps_to_gcs_path: Cloud Storage bucket (directory) to upload heap dumps to. Enabling this field implies that dump_heap_on_oom is set to true.
    launcher_machine_type: The machine type to use for launching the Dataflow job. The default is n1-standard-1.
    enable_launcher_vm_serial_port_logging: If true serial port logging will be enabled for the launcher VM.
    update: Set this to true if you are sending a request to update a running streaming job. When set, the job name should be the same as the running job.
    transform_name_mappings: Use this to pass transformNameMappings for streaming update jobs. Example: `{"oldTransformName":"newTransformName",...}`. For more details, see https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline#Mapping
    validate_only: If true, the request is validated but not actually executed. Defaults to false.
    project: The ID of the Cloud Platform project that the job belongs to. Defaults to the project in which the PipelineJob is run.

    Returns:
      gcp_resources: Serialized gcp_resources proto tracking the Dataflow job. For more details, see https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md.
  """
  # fmt: on
  return ContainerSpec(
      image=_image.GCPC_IMAGE_TAG,
      command=[
          'python3',
          '-u',
          '-m',
          'google_cloud_pipeline_components.container.preview.dataflow.flex_template.launcher',
      ],
      args=[
          '--type',
          'DataflowJob',
          '--project',
          project,
          '--location',
          location,
          '--payload',
          ConcatPlaceholder([
              '{',
              '"launch_parameter": {',
              '"job_name": "',
              job_name,
              '"',
              ', "container_spec_gcs_path": "',
              container_spec_gcs_path,
              '"',
              ', "parameters": ',
              parameters,
              ', "launch_options": ',
              launch_options,
              ', "environment": {',
              '"num_workers": ',
              num_workers,
              ', "max_workers": ',
              max_workers,
              ', "service_account_email": "',
              service_account_email,
              '"',
              ', "temp_location": "',
              temp_location,
              '"',
              ', "machine_type": "',
              machine_type,
              '"',
              ', "additional_experiments": ',
              additional_experiments,
              ', "network": "',
              network,
              '"',
              ', "subnetwork": "',
              subnetwork,
              '"',
              ', "additional_user_labels": ',
              additional_user_labels,
              ', "kms_key_name": "',
              kms_key_name,
              '"',
              ', "ip_configuration": "',
              ip_configuration,
              '"',
              ', "worker_region": "',
              worker_region,
              '"',
              ', "worker_zone": "',
              worker_zone,
              '"',
              ', "enable_streaming_engine": ',
              enable_streaming_engine,
              ', "flexrs_goal": "',
              flexrs_goal,
              '"',
              ', "staging_location": "',
              staging_location,
              '"',
              ', "sdk_container_image": "',
              sdk_container_image,
              '"',
              ', "disk_size_gb": ',
              disk_size_gb,
              ', "autoscaling_algorithm": "',
              autoscaling_algorithm,
              '"',
              ', "dump_heap_on_oom": ',
              dump_heap_on_oom,
              ', "save_heap_dumps_to_gcs_path": "',
              save_heap_dumps_to_gcs_path,
              '"',
              ', "launcher_machine_type": "',
              launcher_machine_type,
              '"',
              ', "enable_launcher_vm_serial_port_logging": ',
              enable_launcher_vm_serial_port_logging,
              '}',
              ', "update": ',
              update,
              ', "transform_name_mappings": ',
              transform_name_mappings,
              '}',
              ', "validate_only": ',
              validate_only,
              '}',
          ]),
          '--gcp_resources',
          gcp_resources,
      ],
  )
